/*************************************************************************
> File Name: crew.c
> Author: 
> Mail: 
> Created Time: 2015年01月09日 星期五 14时46分27秒
************************************************************************/

#include<sys/types.h>
#include<pthread.h>
#include<sys/stat.h>
#include<dirent.h>
#include"errors.h"

#define CREW_SIZE 4

typedef struct work_tag{
    struct work_tag     *next;
    char                *path;      //Directory or file
    char                *string;    //Search string
}work_t,*work_p;

typedef struct worker_tag{
    int                 index;      //thread's index
    pthread_t           thread;     //thread for stage
    struct crew_tag     *crew;      //Pointer to crew
}worker_t,*worker_p;

typedef struct crew_tag{
    int                 crew_size;      // size of array
    worker_t            crew[CREW_SIZE];// crew memebers
    long                work_count;     // count of work items
    work_p              first,last;     //First &last data
    pthread_mutex_t     mutex;          
    pthread_cond_t      done,go;        //wait for crew done or go
}crew_t,*crew_p;

size_t path_max;        //filepath length
size_t name_max;        //name length

// the thread start routine for crew threads.waits unitl "go" command,
// processes work items until requested to shut down
void *worker_routine(void *arg){
    worker_p mine = (worker_p)arg;
    crew_p crew = mine->crew;
    work_p work,new_work;

    struct stat filestat;
    struct dirent *entry;
    int status;

    entry = (struct dirent*)malloc(sizeof(struct dirent) + name_max);
    if(NULL == entry) errno_abort("allocate dirent");

    if((status = pthread_mutex_lock(&crew->mutex))) err_abort(status,"lock crew mutex");

    while( ! crew->work_count){
        if((status = pthread_cond_wait(&crew->go,&crew->mutex))){
            err_abort(status,"Wait for go");
        }
    }

    if((status = pthread_mutex_unlock(&crew->mutex))) err_abort(status,"unlock crew mutex");

    DPRINTF(("Crew %d starting\n",mine->index));

    while(1){
        /*
        * wait for while if nothing to do ,and if crew->first is NULL
        * there is no work,but if crew->work_count goes to zero,done!
        */

        if((status = pthread_mutex_lock(&crew->mutex))){
            err_abort(status,"Lock crew mutex");
        }

        DPRINTF(("Crew %d top: first is %#x,count is %d\n",
                 mine->index,crew->first,crew->work_count));
        while(crew->first == NULL){
            if((status = pthread_cond_wait(&crew->go,&crew->mutex))){
                err_abort(status,"Wait for work");
            }
        }
        DPRINTF(("Crew %d work :%s : %#lx,%ld\n",mine->index,crew->first->path,crew->first,crew->work_count));

        work = crew->first;
        crew->first = work->next;
        if(crew->first == NULL) crew->last = NULL;

        DPRINTF(("Crew %d top %#lx, leaves first %#lx,last %#lx\n",
                 mine->index,work,crew->first,crew->last));

        if((status = pthread_mutex_unlock(&crew->mutex))){
            err_abort(status,"unlock crew mutex");
        }

        // process work item
        status = lstat(work->path,&filestat);

        if(S_ISLNK(filestat.st_mode)){
            printf("Thread id :%d:%s is a link ,skipping \n",
                   mine->index,work->path);
        }else if(S_ISDIR(filestat.st_mode)){
            DIR *directory;
            struct dirent *result;

            directory = opendir(work->path);
            if(directory == NULL){
                fprintf(stderr,"Unable to open directory %s: %d(%s)\n",
                        work->path,errno,strerror(errno));
                continue;
            }

            while(1){
                status = readdir_r(directory,entry,&result);
                if(status != 0){
                    fprintf(stderr,
                            "Unable to read directory %s:%d(%s)\n",
                            work->path,
                            status,strerror(status));
                    break;
                }
                if(result == NULL) break;
                // ignore . and .. directory
                if(!strcmp(entry->d_name,".")) continue;
                if(!strcmp(entry->d_name,"..")) continue;

                new_work = (work_p)malloc(sizeof(work_t));
                if(NULL == new_work) errno_abort("unable allocate space");

                new_work->path = (char*)malloc(path_max);
                if(NULL == new_work->path) errno_abort("unable allocate path");

                strcpy(new_work->path,work->path);
                strcat(new_work->path,"/");
                strcat(new_work->path,entry->d_name);

                new_work->string = work->string;
                new_work->next = NULL;
                if((status = pthread_mutex_lock(&crew->mutex))){
                    err_abort(status,"Lock crew mutex");
                }
                if(crew->first == NULL){
                    crew->first = new_work;
                    crew->last = new_work;
                }else{
                    crew->last->next = new_work;
                    crew->last = new_work;
                }
                crew->work_count++;
                DPRINTF(("Crew %d add %s,first %#lx,last %#lx,%d\n",
                         mine->index,new_work->path,crew->first,
                         crew->last,crew->work_count));

                status = pthread_cond_signal(&crew->go);
                status = pthread_mutex_unlock(&crew->mutex);
                if(status) err_abort(status,"unlock mutex");
            }
            closedir(directory);
        }else if(S_ISREG(filestat.st_mode)){
            FILE *search;
            char buffer[256],*buffer_ptr,*search_ptr;

            search = fopen(work->path,"r");
            if(search == NULL) fprintf(stderr,"Unable to open %s :%d(%s)\n",
                                      work->path,errno,strerror(errno));
            else{
                while(1){
                    buffer_ptr = fgets(buffer,sizeof(buffer),search);
                    if(buffer_ptr == NULL){
                        if(feof(search)) break;
                        if(ferror(search)) {
                            fprintf(stderr,"Unable to read %s :%d(%s)\n",
                            work->path,errno,strerror(errno));
                            break;
                        }    
                    }
                    search_ptr = strstr(buffer,work->string);
                    if(search_ptr != NULL){
                        printf("Thread %d found \"%s\" in %s\n",mine->index,
                              work->string,work->path);
                        break;
                    }
                }
                fclose(search);
            }
        }else fprintf(
            stderr,
            "Thread %d :%s is type to %o(%s)\n",
            mine->index,
            work->path,
            filestat.st_mode & S_IFMT,
            (S_ISFIFO(filestat.st_mode) ? "FIFO"
             :(S_ISCHR(filestat.st_mode) ? "CHR"
               :(S_ISBLK(filestat.st_mode)? "BLK"
                 :(S_ISSOCK(filestat.st_mode)? "SOCK"
                   :"unknow type"))))
            );
        free(work->path);
        free(work);
        
        //lock crew and decrement count of work items
        if((status = pthread_mutex_lock(&crew->mutex))){
            err_abort(status,"lock crew mutex"); 
        }

        crew->work_count--;
        DPRINTF(("Crew %d decrement work to %d\n",mine->index,
                crew->work_count));
        if(crew->work_count <= 0){
            DPRINTF(("Crew thread %d done\n",mine->index));
            if((status = pthread_cond_broadcast(&crew->done))){
                err_abort(status,"Wake waiters");
            } 
            if((status = pthread_mutex_unlock(&crew->mutex))){
                err_abort(status,"Unlock mutex");
            }
            break;
        }
        status = pthread_mutex_unlock(&crew->mutex); 
    }
    free(entry);
    return NULL;
}

int crew_create(crew_p crew,int crew_size){
    int crew_index;
    int status;

    if(crew_size > CREW_SIZE) return EINVAL;
    crew->crew_size = crew_size;
    crew->work_count = 0;
    crew->first = crew->last = NULL;

    if((status = pthread_mutex_init(&crew->mutex,NULL)))
        return status;

    if((status = pthread_cond_init(&crew->done,NULL)))
        return status;

    if((status = pthread_cond_init(&crew->go,NULL)))
        return status;

    // create worker thread
    // The crew is a group of workers.the workers in crew[crew_index]
    // every worker share the work items in crew. worker 's crew point 
    // to the shared crew.Every workers has it's own thread to deal with
    // the shared crew 's work items 
    for(crew_index = 0; crew_index < CREW_SIZE;crew_index++){
        crew->crew[crew_index].index = crew_index;
        crew->crew[crew_index].crew = crew;
        if((status = pthread_create(&crew->crew[crew_index].thread,
                                   NULL,worker_routine,(void*)&crew->crew[crew_index])))
            err_abort(status,"create worker");
    }
    return 0;
}

int crew_start(crew_p crew,char* filepath,char *search){
    work_p request;
    int status;

    if((status = pthread_mutex_lock(&crew->mutex))){
        return status;
    }

    while(crew->work_count > 0){
        status = pthread_cond_wait(&crew->done,&crew->mutex);
        if(status){
            pthread_mutex_unlock(&crew->mutex);
            return status;
        }
    }
    errno = 0;
    path_max = pathconf(filepath,_PC_PATH_MAX);
    if(path_max == -1){
        if(errno == 0) path_max = 1024;
        else errno_abort("Unable to get PATH_MAX");
    }
    errno = 0;
    name_max = pathconf(filepath,_PC_NAME_MAX);
    if(name_max == -1){
        if(errno == 0) name_max = 256;
        else errno_abort("Unable to get NAME_MAX");
    }

    DPRINTF(("PATH_MAX for %s is %d, NAME_MAX is %ld\n",
           filepath,path_max,name_max));

    //add null byte
    path_max++;
    name_max++;
    

    request = (work_p)malloc(sizeof(work_t));
    if(NULL == request) errno_abort("Unable to allocate request");
    DPRINTF(("requesting %s\n",filepath));
    request->path = (char*)malloc(path_max);
    if(NULL == request->path) errno_abort("unable to allocate path");
    strcpy(request->path,filepath);
    request->string = search;
    request->next = NULL;

    if(crew->first == NULL){
        crew->first = request;
        crew->last  = request;
    }else{
        crew->last->next = request;
        crew->last       = request;
    }

    crew->work_count++;
    status = pthread_cond_signal(&crew->go);
    if(status){
        free(crew->first);
        crew->first = NULL;
        crew->work_count = 0;
        pthread_mutex_unlock(&crew->mutex);
        return status;
    }
    while(crew->work_count > 0){
        if((status = pthread_cond_wait(&crew->done,&crew->mutex))){
            err_abort(status,"waiting for crew to finish");
        }
    }
    if((status = pthread_mutex_unlock(&crew->mutex))){
        err_abort(status,"Unlock crew lock");
    }
    return 0; 
}

int main(int argc,char*argv[]){
    crew_t my_crew;
    char line[128], *next;
    int status;

    if(argc < 3){
        fprintf(stderr,"Usage : %s string path\n",argv[0]);
    }

    if((status = crew_create(&my_crew,CREW_SIZE))){
        err_abort(status,"Create crew");
    }
    if((status = crew_start(&my_crew,argv[2],argv[1]))){
        err_abort(status,"Start crew");
    }
    return 0;
}
